feat(tts):增加tts(阿里云)提供商CosyVoice TTS(API),Qwen TTS Realtime(API)的支持,增加过滤 TTS 文本中的内容功能#7651
feat(tts):增加tts(阿里云)提供商CosyVoice TTS(API),Qwen TTS Realtime(API)的支持,增加过滤 TTS 文本中的内容功能#7651yuxwd wants to merge 13 commits intoAstrBotDevs:masterfrom
Conversation
Add two new TTS providers using Alibaba Cloud DashScope SDK: - Qwen TTS Realtime: WebSocket streaming TTS with low latency, supports qwen3-tts-flash-realtime and qwen3-tts-instruct-flash-realtime models - CosyVoice TTS: Non-streaming TTS with multiple voice options, supports cosyvoice-v3.5/v3/v2 models Includes config templates, provider manager integration, and i18n translations (zh-CN, en-US, ru-RU). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a TTS text filtering mechanism to strip markers like brackets and asterisks from text before synthesis, and adds support for Qwen TTS Realtime and CosyVoice TTS providers. The review feedback points out several critical issues: a logic error in the Qwen streaming implementation that causes audio duplication, a blocking call in an asynchronous function that could impact responsiveness, and incorrect usage of the DashScope SDK in the CosyVoice provider. Additionally, the FilteredQueue implementation requires a call to the base class constructor to ensure all inherited methods function correctly.
faf411f to
0068960
Compare
…L 的警告日志 2.FilteredQueue 不再继承 asyncio.Queue,改用组合模式;自定义正则无效时输出警告日志
There was a problem hiding this comment.
Code Review
This pull request introduces a TTS text filtering mechanism to remove emotion markers and brackets before synthesis, alongside adding support for Qwen TTS Realtime and CosyVoice TTS providers. The changes include a new TTSTextFilter utility, a FilteredQueue wrapper for streaming scenarios, and corresponding configuration and localization updates. Feedback identifies a logic error in the Qwen provider causing redundant text synthesis, potential issues with empty strings in the filtered queue, and suggestions for optimizing imports and correcting type hints.
- 移除 Qwen TTS Realtime get_audio_stream 中 accumulated_text 重复发送逻辑,避免文本被合成两次 - FilteredQueue.get() 过滤后为空字符串时跳过并继续获取下一条目,防止 TTS 收到空文本 - 将 _pcm_to_wav 中的 import struct 移到文件顶部,避免流式路径中重复导入开销 - 修正 tts_text_queue 类型声明为 asyncio.Queue[str | None] | FilteredQueue
|
@gemini-code-assist review |
There was a problem hiding this comment.
Code Review
This pull request introduces a TTS text filtering mechanism to remove emotion markers (like brackets) from text before synthesis, along with support for two new TTS providers: Qwen TTS Realtime and CosyVoice TTS via the DashScope API. The filtering logic includes a new utility class and an asynchronous queue wrapper, supported by configuration updates, dashboard translations, and unit tests. Feedback suggests optimizing configuration retrieval in the result decoration stage to avoid redundant lookups and adding type hints to the new queue wrapper for better maintainability.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
|
@gemini-code-assist review |
There was a problem hiding this comment.
Code Review
This pull request introduces a TTS text filtering system to remove emotion markers and bracketed content from text before speech synthesis. It also adds support for two new TTS providers, Qwen TTS Realtime and CosyVoice TTS, along with their respective configurations and localization updates. Review feedback highlights an efficiency improvement by moving configuration retrieval outside of a loop and a critical fix in the FilteredQueue to ensure task_done() is called for filtered items to prevent potential deadlocks.
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- In
cosyvoice_tts_source.py,SpeechSynthesizeris imported unconditionally at module import time, unlike the guarded import pattern used inqwen_tts_realtime_source.py; consider wrapping this import in atry/except ImportErrorwith a clear error message so environments withoutdashscopedon’t crash on import. - The built-in patterns in
TTSTextFilter.BUILTIN_PATTERNSaggressively strip all*...*and**...**segments, which may remove legitimate markdown emphasis rather than just emotion markers; consider either narrowing these regexes or making markdown-based stripping configurable to avoid unexpected loss of content.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `cosyvoice_tts_source.py`, `SpeechSynthesizer` is imported unconditionally at module import time, unlike the guarded import pattern used in `qwen_tts_realtime_source.py`; consider wrapping this import in a `try/except ImportError` with a clear error message so environments without `dashscope` don’t crash on import.
- The built-in patterns in `TTSTextFilter.BUILTIN_PATTERNS` aggressively strip all `*...*` and `**...**` segments, which may remove legitimate markdown emphasis rather than just emotion markers; consider either narrowing these regexes or making markdown-based stripping configurable to avoid unexpected loss of content.
## Individual Comments
### Comment 1
<location path="astrbot/core/provider/sources/qwen_tts_realtime_source.py" line_range="145-154" />
<code_context>
+ f.write(audio_bytes)
+ return path
+
+ async def _synthesize(self, text: str) -> bytes | None:
+ """Use CosyVoice SpeechSynthesizer to synthesize speech."""
+ loop = asyncio.get_running_loop()
</code_context>
<issue_to_address>
**issue (bug_risk):** Close the WebSocket session and use the thread-safe drain helper when assembling audio in `_synthesize`.
`_synthesize` reads `callback.audio_chunks` directly while the WebSocket thread may still be appending, bypassing the `drain_audio_chunks()` lock, and it never explicitly closes `QwenTtsRealtime`.
To align with the streaming path and avoid races/resource leaks, you could do something like:
```python
try:
await loop.run_in_executor(None, _connect_and_send)
finished = callback.wait_for_finished(timeout=self.timeout)
# ... error/timeout handling ...
pcm_data = b"".join(callback.drain_audio_chunks())
if not pcm_data:
return None
return self._pcm_to_wav(pcm_data, sample_rate=24000)
finally:
try:
await loop.run_in_executor(None, qwen_tts.close)
except Exception:
pass
```
This keeps audio access thread-safe and ensures the WebSocket is closed even on error paths.
</issue_to_address>
### Comment 2
<location path="astrbot/core/provider/sources/cosyvoice_tts_source.py" line_range="16" />
<code_context>
+import os
+import uuid
+
+from dashscope.audio.tts_v2 import AudioFormat, SpeechSynthesizer
+
+from astrbot.core import logger
</code_context>
<issue_to_address>
**issue (bug_risk):** Guard the CosyVoice DashScope import similarly to the Qwen provider to avoid hard ImportError.
This provider imports `SpeechSynthesizer` at module import time without a `try/except`, so if the DashScope TTS v2 API or `audio.tts_v2` isn’t available, importing this module will raise and can break provider loading.
To align with Qwen and fail gracefully, wrap the import in a `try/except ImportError`, set `AudioFormat` and `SpeechSynthesizer` to `None` on failure, and then have `_synthesize` / `get_audio` raise a clear `RuntimeError` when the SDK capability is missing.
</issue_to_address>
### Comment 3
<location path="astrbot/core/utils/tts_text_filter.py" line_range="52" />
<code_context>
+ return result.strip()
+
+
+class FilteredQueue:
+ """异步队列包装器,在 get() 时自动过滤文本。
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider replacing the speculative FilteredQueue wrapper with a simple filtered_get coroutine to keep behavior explicit and avoid a partial queue-like abstraction.
You can keep the text‑filter functionality while avoiding the speculative `FilteredQueue` abstraction and its partial queue façade.
Instead of wrapping `asyncio.Queue` in a class that shadows part of its API and hides the drop behavior inside `get()`, extract a focused helper coroutine that can be used wherever streaming is actually needed:
```python
class TTSTextFilter:
...
@classmethod
def apply(cls, text: str, custom_rules: list[str] | None = None) -> str:
...
return result.strip()
```
Replace `FilteredQueue` with:
```python
async def filtered_get(
queue: asyncio.Queue,
custom_rules: list[str] | None = None,
) -> str | None:
"""从队列中获取一个元素;如果是字符串则应用 TTS 过滤。
返回值:
- None: 队列中取出了 None,呼叫方可认为是结束信号。
- 非空字符串: 过滤后的文本。
- 其他类型: 原样返回(例如用于控制消息)。
"""
while True:
item = await queue.get()
if item is None:
return None
if isinstance(item, str):
filtered = TTSTextFilter.apply(item, custom_rules)
if filtered:
return filtered
# 空字符串时继续从队列中取下一个
continue
return item
```
Usage in a future streaming TTS consumer stays explicit and predictable:
```python
while True:
text = await filtered_get(raw_text_queue, custom_rules)
if text is None:
break
# 使用过滤后的 text 进行 TTS
```
This keeps:
- All current filtering behavior.
- No partial queue interface to maintain.
- No surprising `get()` semantics on a “queue‑like” object; the drop/loop behavior is clear at the call site.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| async def _synthesize(self, text: str) -> bytes | None: | ||
| """Use Qwen TTS Realtime WebSocket API to synthesize speech.""" | ||
| if QwenTtsRealtime is None: | ||
| raise RuntimeError( | ||
| "dashscope SDK missing QwenTtsRealtime. " | ||
| "Please upgrade the dashscope package to use Qwen TTS Realtime.", | ||
| ) | ||
|
|
||
| callback = _QwenRealtimeCallback() | ||
| model = self.get_model() |
There was a problem hiding this comment.
issue (bug_risk): Close the WebSocket session and use the thread-safe drain helper when assembling audio in _synthesize.
_synthesize reads callback.audio_chunks directly while the WebSocket thread may still be appending, bypassing the drain_audio_chunks() lock, and it never explicitly closes QwenTtsRealtime.
To align with the streaming path and avoid races/resource leaks, you could do something like:
try:
await loop.run_in_executor(None, _connect_and_send)
finished = callback.wait_for_finished(timeout=self.timeout)
# ... error/timeout handling ...
pcm_data = b"".join(callback.drain_audio_chunks())
if not pcm_data:
return None
return self._pcm_to_wav(pcm_data, sample_rate=24000)
finally:
try:
await loop.run_in_executor(None, qwen_tts.close)
except Exception:
passThis keeps audio access thread-safe and ensures the WebSocket is closed even on error paths.
| import os | ||
| import uuid | ||
|
|
||
| from dashscope.audio.tts_v2 import AudioFormat, SpeechSynthesizer |
There was a problem hiding this comment.
issue (bug_risk): Guard the CosyVoice DashScope import similarly to the Qwen provider to avoid hard ImportError.
This provider imports SpeechSynthesizer at module import time without a try/except, so if the DashScope TTS v2 API or audio.tts_v2 isn’t available, importing this module will raise and can break provider loading.
To align with Qwen and fail gracefully, wrap the import in a try/except ImportError, set AudioFormat and SpeechSynthesizer to None on failure, and then have _synthesize / get_audio raise a clear RuntimeError when the SDK capability is missing.
| return result.strip() | ||
|
|
||
|
|
||
| class FilteredQueue: |
There was a problem hiding this comment.
issue (complexity): Consider replacing the speculative FilteredQueue wrapper with a simple filtered_get coroutine to keep behavior explicit and avoid a partial queue-like abstraction.
You can keep the text‑filter functionality while avoiding the speculative FilteredQueue abstraction and its partial queue façade.
Instead of wrapping asyncio.Queue in a class that shadows part of its API and hides the drop behavior inside get(), extract a focused helper coroutine that can be used wherever streaming is actually needed:
class TTSTextFilter:
...
@classmethod
def apply(cls, text: str, custom_rules: list[str] | None = None) -> str:
...
return result.strip()Replace FilteredQueue with:
async def filtered_get(
queue: asyncio.Queue,
custom_rules: list[str] | None = None,
) -> str | None:
"""从队列中获取一个元素;如果是字符串则应用 TTS 过滤。
返回值:
- None: 队列中取出了 None,呼叫方可认为是结束信号。
- 非空字符串: 过滤后的文本。
- 其他类型: 原样返回(例如用于控制消息)。
"""
while True:
item = await queue.get()
if item is None:
return None
if isinstance(item, str):
filtered = TTSTextFilter.apply(item, custom_rules)
if filtered:
return filtered
# 空字符串时继续从队列中取下一个
continue
return itemUsage in a future streaming TTS consumer stays explicit and predictable:
while True:
text = await filtered_get(raw_text_queue, custom_rules)
if text is None:
break
# 使用过滤后的 text 进行 TTSThis keeps:
- All current filtering behavior.
- No partial queue interface to maintain.
- No surprising
get()semantics on a “queue‑like” object; the drop/loop behavior is clear at the call site.



Modifications / 改动点
tts提供商添加
项目原生tts阿里云提供商tts支持不全面,改动添加了CosyVoice TTS(API),Qwen TTS Realtime(API)的支持
过滤 TTS 文本中的内容
bot发送tts优化,增加了,过滤 TTS 文本中的内容的功能,可以让tts不读()的内容,支持正则过滤
代码测试
进行了macos系统和linux(Alibaba Cloud Linux 3.2104 LTS 64位)测试无问题




Screenshots or Test Results / 运行截图或测试结果
Checklist / 检查清单
😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
/ 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
/ 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。
🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in
requirements.txtandpyproject.toml./ 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到
requirements.txt和pyproject.toml文件相应位置。😮 My changes do not introduce malicious code.
/ 我的更改没有引入恶意代码。
Summary by Sourcery
Add new Alibaba Cloud TTS providers and introduce configurable text filtering before TTS synthesis.
New Features:
Enhancements:
Tests: